Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1305: Support persisting RDD's directly to Tachyon #158

Closed
wants to merge 56 commits into from

Conversation

RongGu
Copy link
Contributor

@RongGu RongGu commented Mar 16, 2014

Move the PR#468 of apache-incubator-spark to the apache-spark
"Adding an option to persist Spark RDD blocks into Tachyon."

@RongGu
Copy link
Contributor Author

RongGu commented Mar 16, 2014

The PR is moved here, and the left commits will be push to this site then.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13201/

… PR#468 of apache-incubator-spark to the apache-spark
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13209/

@@ -97,6 +101,23 @@ private[spark] class BlockManager(
var asyncReregisterTask: Future[Unit] = null
val asyncReregisterLock = new Object

private def tachyonStore : TachyonStore = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make this a lazy val, it will have the same effect, be more efficient, and be less code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, here it's fine to use lazy val.

@mateiz
Copy link
Contributor

mateiz commented Mar 19, 2014

This seems to be failing tests, have you run them locally?

@pwendell
Copy link
Contributor

Hey @RongGu we've made some changes to the block store that I think will require up-merging this patch to master. We're interested in merging this soon. Any chance you could take a look at this and bring it up to date? Thanks!

@RongGu
Copy link
Contributor Author

RongGu commented Mar 21, 2014

Hey @mateiz . Yes, I verified the whole thing last night. I set up a tachyon locally and the example I added using tachyonstore works well. Also, the BlockManagerSuite with my added test passed here.
BTW, the whole test never passed here on my laptop from the very beginning to now. Some test failed here because of my environment problem,such as OOM "Test org.apache.spark.JavaAPISuite.sparkContextUnion failed: java.lang.OutOfMemoryError: unable to create new native thread".
I would like to re-test the current version on the AmplabJenkins in order to check out current logs. The previous log link https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13209/ seems expired.

@RongGu
Copy link
Contributor Author

RongGu commented Mar 21, 2014

Hey, @pwendell . Thanks for your notification. I've been busy with the school stuff here since this semester started. There is still a bit work need to done in this PR. I guess several other PRs will be merged into trunk in next a few days. Thus, I would like to perform the up-merging work when I have done with the feature in this PR and before you think it's okay to merge this PR. In a word, I would like to perform the up-merging work at last. Is this plan work for you?
I will continue to work on this PR in the next weeks in a part-time way. I believe the code/features merged into the Apahe Spark should have high quality.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13312/

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13318/

@haoyuan
Copy link
Contributor

haoyuan commented Mar 21, 2014

Jenkins, test this please

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

boolean useTachyon,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useTachyon, deserialized, replication);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't just delete the old method -- add a second version that takes the new useTachyon argument. Some user code will be using the old method and there's no reason to break it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13781/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13782/

@haoyuan
Copy link
Contributor

haoyuan commented Apr 5, 2014

@mateiz Merged the master branch.

@mateiz
Copy link
Contributor

mateiz commented Apr 5, 2014

HY, it still doesn't merge. I see CONFLICT (content): Merge conflict in core/src/main/scala/org/apache/spark/util/Utils.scala.

Maybe you didn't update to the very latest one? Or maybe you got unlucky.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13788/

@haoyuan
Copy link
Contributor

haoyuan commented Apr 5, 2014

@mateiz See if it works now :)

@pwendell
Copy link
Contributor

pwendell commented Apr 5, 2014

Merging cleanly now. I merged this - thanks!

@asfgit asfgit closed this in b50ddfd Apr 5, 2014
@RongGu
Copy link
Contributor Author

RongGu commented Apr 5, 2014

It seems that I just missed our last part. I am so happy this PR is merged. Thank @pwendell, @mateiz and @aarondav for your valuable comments all the way. Also, I appreciate @haoyuan 's leading and helping on this PR.

@mateiz
Copy link
Contributor

mateiz commented Apr 5, 2014

No worries, thanks a lot to both you and Haoyuan for putting this together! This will be a great feature to have.

pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Move the PR#468 of apache-incubator-spark to the apache-spark
"Adding an option to persist Spark RDD blocks into Tachyon."

Author: Haoyuan Li <[email protected]>
Author: RongGu <[email protected]>

Closes apache#158 from RongGu/master and squashes the following commits:

72b7768 [Haoyuan Li] merge master
9f7fa1b [Haoyuan Li] fix code style
ae7834b [Haoyuan Li] minor cleanup
a8b3ec6 [Haoyuan Li] merge master branch
e0f4891 [Haoyuan Li] better check offheap.
55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel
7cd4600 [RongGu] remove some logic code for tachyonstore's replication
51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore
8adfcfa [RongGu] address arron's comment on inTachyonSize
120e48a [RongGu] changed the root-level dir name in Tachyon
5cc041c [Haoyuan Li] address aaron's comments
9b97935 [Haoyuan Li] address aaron's comments
d9a6438 [Haoyuan Li] fix for pspark
77d2703 [Haoyuan Li] change python api.git status
3dcace4 [Haoyuan Li] address matei's comments
91fa09d [Haoyuan Li] address patrick's comments
589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE
64348b2 [Haoyuan Li] update conf docs.
ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1
619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore
be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler
49cc724 [Haoyuan Li] update docs with off_headp option
4572f9f [RongGu] reserving the old apply function API of StorageLevel
04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP
c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP
76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md
e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments
fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix
939e467 [Haoyuan Li] 0.4.1-thrift from maven central
86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1
16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift
eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem
6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
d827250 [RongGu] fix JsonProtocolSuie test failure
716e93b [Haoyuan Li] revert the version
ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift
2825a13 [RongGu] up-merging to the current master branch of the apache spark
6a22c1a [Haoyuan Li] fix scalastyle
8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client.
77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice.
1dcadf9 [Haoyuan Li] typo
bf278fa [Haoyuan Li] fix python tests
e82909c [Haoyuan Li] minor cleanup
776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR
8859371 [Haoyuan Li] various minor fixes and clean up
e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode.
fcaeab2 [Haoyuan Li] address Aaron's comment
e554b1e [Haoyuan Li] add python code
47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels.
dc8ef24 [Haoyuan Li] add old storelevel constructor
e01a271 [Haoyuan Li] update tachyon 0.4.1
8011a96 [RongGu] fix a brought-in mistake in StorageLevel
70ca182 [RongGu] a bit change in comment
556978b [RongGu] fix the scalastyle errors
791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
ericl pushed a commit to ericl/spark that referenced this pull request Dec 30, 2016
## What changes were proposed in this pull request?
The ACL Thrift end to end test currently fail because they do not set the `spark.databricks.acl.enabled` flag. This PR fixes that.

## How was this patch tested?
This is a test.

Author: Herman van Hovell <[email protected]>

Closes apache#158 from hvanhovell/SC-5365-thrift.
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 12, 2017
jlopezmalla pushed a commit to jlopezmalla/spark that referenced this pull request Feb 27, 2018
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: attilapiros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: attilapiros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e00b81e)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants